package ro.pub.cs.pp.pdad;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ComponentType {

	public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
		
		private final static IntWritable one = new IntWritable(1); 
		private final static IntWritable zero = new IntWritable(0); 
		
		private int extractComponentType(String line) {
			String[] pieces = line.split("\\s+"); 
			return Integer.parseInt(pieces[6]); 
		}
		
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString(); 
			try { 
				int componentType = extractComponentType(line);
				context.write(new IntWritable(componentType), one);
			} catch (NumberFormatException ex) {
				context.write(zero, zero); 
			}
			
		}
	}
	
	public static class Reduce extends Reducer <IntWritable, IntWritable, Text, IntWritable> {
		
		public static String[] compTypeMap = {"host", "CPU", "IO", "network", "memory", "software"}; 
		private Text compType = new Text(); 
		
		public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0; 
			for (IntWritable cnt : values) {
				sum += cnt.get(); 
			}
			compType.set(compTypeMap[key.get()]); 
			context.write(compType, new IntWritable(sum));  
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration(); 
		Job job = new Job(conf, "ComponentType");
		job.setNumReduceTasks(5);
		job.setJarByClass(ComponentType.class); 
		job.setOutputKeyClass(IntWritable.class); 
		job.setOutputValueClass(IntWritable.class); 
		job.setMapperClass(Map.class);
		/* don't set any combiner class */
		job.setReducerClass(Reduce.class); 		
		FileInputFormat.addInputPath(job, new Path(args[0])); 
		FileOutputFormat.setOutputPath(job, new Path(args[1])); 
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	
}
